package com.nstskj.study.netty.tcp.netty.protocol.session.base;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.nstskj.study.netty.tcp.netty.protocol.endpoint.base.AbstractEndpoint;
import com.nstskj.study.netty.tcp.netty.protocol.message.in.msg.base.AbstractInputNetTcpMessage;
import com.nstskj.study.netty.tcp.netty.protocol.message.out.msg.base.AbstractOutputNetTcpMessage;
import com.nstskj.study.netty.tcp.netty.protocol.session.enums.SessionStatusEnums;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import lombok.Data;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @author ZhouChuGang
 * @version 1.0
 * @project nstskj-study-netty-tcp-spring
 * @date 2021/4/21 20:32
 * @Description 抽象的session对象
 */
@Data
public abstract class AbstractSession implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 会话id
     */
    private final String sessionId;

    /**
     * 管道
     */
    @JsonIgnore
    private final Channel channel;

    /**
     * 接收设备消息的消息队列
     */
    @JsonIgnore
    private final BlockingQueue<AbstractInputNetTcpMessage> recMessageBlockingQueue;

    /**
     * 发送消息的消息队列
     */
    @JsonIgnore
    private final BlockingQueue<AbstractOutputNetTcpMessage> sendMessageBlockingQueue;

    /**
     * 厂家id  16字节
     */
    private String factoryId;

    /**
     * 用户id  16字节
     */
    private String userId;

    /**
     * 设备id  32字节
     */
    private String deviceId;

    /**
     * 设备版本
     */
    private String deviceVersion;

    /**
     * 端点集合
     */
    private final Map<Short, AbstractEndpoint> endpointMap;

    /**
     * 状态
     */
    private volatile SessionStatusEnums status;

    /**
     * 构造器
     *
     * @param sessionId
     * @param channel
     */
    public AbstractSession(String sessionId, Channel channel) {
        this.sessionId = sessionId;
        this.channel = channel;
        this.status = SessionStatusEnums.INIT;
        this.recMessageBlockingQueue = new LinkedBlockingQueue<>();
        this.sendMessageBlockingQueue = new LinkedBlockingQueue<>();
        this.endpointMap = new ConcurrentHashMap<>();
    }

    /**
     * 添加接收处理的消息
     *
     * @param abstractInputNetTcpMessage
     */
    public boolean addRecNetTcpMessage(AbstractInputNetTcpMessage abstractInputNetTcpMessage) {
        return this.recMessageBlockingQueue.offer(abstractInputNetTcpMessage);
    }

    /**
     * 添加发送的消息
     *
     * @param abstractOutputNetTcpMessage
     */
    public boolean addSendNetTcpMessage(AbstractOutputNetTcpMessage abstractOutputNetTcpMessage) {
        return this.sendMessageBlockingQueue.offer(abstractOutputNetTcpMessage);
    }

    /**
     * 添加端点
     *
     * @param endpoint
     * @return
     */
    public void addEndpoint(AbstractEndpoint endpoint) {
        this.endpointMap.put(endpoint.getAddress(), endpoint);
    }

    /**
     * 得到端点
     *
     * @param address
     * @return
     */
    public AbstractEndpoint getEndpointByAddress(Short address) {
        return this.endpointMap.get(address);
    }

    /**
     * 发送字节数组
     *
     * @param bytes
     */
    public void sendBytes(byte[] bytes) {
        ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
        this.channel.writeAndFlush(byteBuf);
    }

    /**
     * 发送tcp消息
     *
     * @param abstractOutputNetTcpMessage
     */
    public void sendOutputNetTcpMessage(AbstractOutputNetTcpMessage abstractOutputNetTcpMessage) {
        this.channel.writeAndFlush(abstractOutputNetTcpMessage);
    }

    /**
     * 关闭方法
     */
    public abstract void close();

}
